
package com.shockweb.configcenter;


import java.io.UnsupportedEncodingException;
import java.security.NoSuchAlgorithmException;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.ExecutorService;

import com.shockweb.bridge.DataAgreement;
import com.shockweb.bridge.OperationDefine;
import com.shockweb.client.exception.ClientException;
import com.shockweb.client.impl.ConfigCenterClient;
import com.shockweb.configcenter.config.ConfigCenterConfig;
import com.shockweb.configcenter.data.CenterRoot;
import com.shockweb.configcenter.data.IReq;
import com.shockweb.configcenter.data.Req;
import com.shockweb.configcenter.data.ReqGroup;
import com.shockweb.configcenter.data.ReqName;
import com.shockweb.configcenter.data.ReqSync;
import com.shockweb.configcenter.data.ReqValue;
import com.shockweb.service.exception.ServerException;
import com.shockweb.common.log.LogManager;
import com.shockweb.utils.Convert;
import com.shockweb.common.International;
import com.shockweb.common.security.Md5;
import com.shockweb.common.serializable.SerializableObject;
import com.shockweb.common.context.ContextManager;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;

/**
 * 配置中心接收请求的handler
 * 
 * @author 彭明华
 * 2018年3月20日 创建
 */
public class CenterServerNettyHandler extends ChannelInboundHandlerAdapter {


	/**
	 * 配置信息
	 */
	ConfigCenterConfig config = null;
	
    /**
     * 线程池
     */
    private ExecutorService cachedThreadPool = null;
    

	/**
	 * 构造方法
	 * @param config
	 */
    public CenterServerNettyHandler(ConfigCenterConfig config,ExecutorService cachedThreadPool){
    	this.config = config;
    	this.cachedThreadPool = cachedThreadPool;
    }
	/**
	 * 被调用次数
	 */
    private static long call = 0;
    
    /**
     * 被调用次数
     * @return
     */
    public static long getCall() {
        return call;
    }

    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        super.channelActive(ctx);
        LogManager.infoLog(this.getClass(),ctx.channel().remoteAddress().toString() + "连接" + ctx.channel().localAddress().toString());
    }
    
    /**
     * 服务端接收客户端发来的数据时触发
     */
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        ByteBuf buf = null;
        //判断数据类型
        if (msg instanceof ByteBuf) {
            buf = (ByteBuf) msg;
        } else {
        	LogManager.errorLog(this.getClass(), new CenterServerException("接收到非法数据,非ByteBuf数据"));
            return;
        }
        call++;
        try {
            byte[] data = new byte[buf.readableBytes()];
            buf.readBytes(data);
	        cachedThreadPool.execute(new Runnable() {
				@Override
				public void run() {
					try {
						byte operation = DataAgreement.resolutionOperation(data);
			            if (operation == OperationDefine.ALIVE.value()) {
			            	return;
			            }
			            String uuid = DataAgreement.resolutionUUID(data);
			            ContextManager.setUuid(uuid);
			            int offset = DataAgreement.BYTE_LENGTH + DataAgreement.UUID_LENGTH;
			            try{
			            	if (operation == OperationDefine.REQ_SYNC_CONFIG.value()) {
				            	ReqSync req = (ReqSync)Convert.convertToObject(data, offset, data.length-offset, ReqSync.class);
				            	if(checkSign(req)){
					            	CenterRoot.getCenterRoot().syncConfig(req.getConfigs());
					            	sendData(ctx.channel(),OperationDefine.RES_SUCCESS,uuid,null);
				            	}else{
				            		throw new CenterServerException("验签错误，请求数据非法");
				            	}
			            	}else if (operation == OperationDefine.REQ_PUT_CONFIG.value()) {
				            	ReqValue req = (ReqValue)Convert.convertToObject(data, offset, data.length-offset, ReqValue.class);
				            	if(checkSign(req)){
				            		CenterRoot.getCenterRoot().putConfig(req.getGroup(), req.getName(), req.getValue());
					            	ConfigCenterClient client = null;
					            	try{
					            		client = getClient();
					            		client.syncConfig(CenterRoot.getCenterRoot().getConfigs());
					            	}finally{
					            		if(client!=null){
					            			client.close();
					            		}
					            	}
					            	sendData(ctx.channel(),OperationDefine.RES_SUCCESS,uuid,null);
				            	}else{
				            		throw new CenterServerException("验签错误，请求数据非法");
				            	}
				            } else if (operation == OperationDefine.REQ_GET_CONFIG.value()) {
				            	ReqName req = (ReqName)Convert.convertToObject(data, offset, data.length-offset, ReqName.class);
				            	String config = CenterRoot.getCenterRoot().getConfig(req.getGroup(),req.getName());
				            	sendData(ctx.channel(),OperationDefine.RES_RESULT,uuid,Convert.convertToBytes(config));
				            } else if (operation == OperationDefine.REQ_REMOVE_CONFIG.value()) {
				            	ReqName req = (ReqName)Convert.convertToObject(data, offset, data.length-offset, ReqName.class);
				            	if(checkSign(req)){
				            		CenterRoot.getCenterRoot().removeConfig(req.getGroup(),req.getName());
					            	ConfigCenterClient client = null;
					            	try{
					            		client = getClient();
					            		client.syncConfig(CenterRoot.getCenterRoot().getConfigs());
					            	}finally{
					            		if(client!=null){
					            			client.close();
					            		}
					            	}
					            	sendData(ctx.channel(),OperationDefine.RES_SUCCESS,uuid,null);
				            	}else{
				            		throw new CenterServerException("验签错误，请求数据非法");
				            	}
				            } else if (operation == OperationDefine.REQ_CLEAR_CONFIG.value()) {
				            	Req req = (Req)Convert.convertToObject(data, offset, data.length-offset, Req.class);
				            	if(checkSign(req)){
				            		CenterRoot.getCenterRoot().clear();
					            	ConfigCenterClient client = null;
					            	try{
					            		client = getClient();
					            		client.syncConfig(CenterRoot.getCenterRoot().getConfigs());
					            	}finally{
					            		if(client!=null){
					            			client.close();
					            		}
					            	}
					            	sendData(ctx.channel(),OperationDefine.RES_SUCCESS,uuid,null);
				            	}else{
				            		throw new CenterServerException("验签错误，请求数据非法");
				            	}
				            } else if (operation == OperationDefine.REQ_QUERY_CONFIGGROUP.value()) {
				            	Req req = (Req)Convert.convertToObject(data, offset, data.length-offset, Req.class);
				            	if(checkSign(req)){
					            	List<String> groups = CenterRoot.getCenterRoot().getGroups();
					            	sendData(ctx.channel(),OperationDefine.RES_RESULT,uuid,Convert.convertToBytes(groups));
				            	}else{
				            		throw new CenterServerException("验签错误，请求数据非法");
				            	}
				            } else if (operation == OperationDefine.REQ_QUERY_CONFIGNAME.value()) {
				            	ReqGroup req = (ReqGroup)Convert.convertToObject(data, offset, data.length-offset, ReqGroup.class);
				            	if(checkSign(req)){
					            	List<String> names = CenterRoot.getCenterRoot().getNames(req.getGroup());
					            	sendData(ctx.channel(),OperationDefine.RES_RESULT,uuid,Convert.convertToBytes(names));
				            	}else{
				            		throw new CenterServerException("验签错误，请求数据非法");
				            	}
				            }else{
				            	LogManager.errorLog(this.getClass(), new CenterServerException("接收到非法数据,非ByteBuf数据,operation=" + operation));
				            }
			            } catch (Exception e) {
			            	sendException(ctx.channel(),OperationDefine.RES_ERROR,uuid,e);
			            	LogManager.errorLog(this.getClass(), new CenterServerException("接收到非法数据,非ByteBuf数据",e));
			            }
			        } catch (Exception e) {
			        	sendException(ctx.channel(),OperationDefine.RES_ERROR,UUID.randomUUID().toString(),e);
			        	LogManager.errorLog(this.getClass(), new CenterServerException("接收到非法数据,读取operation或uuid出错",e));
			        }
				}
	        });
        } catch (Exception e) {
        	sendException(ctx.channel(),OperationDefine.RES_ERROR,UUID.randomUUID().toString(),e);
        	LogManager.errorLog(this.getClass(), new CenterServerException("接收到非法数据,readBytes出错",e));
        } finally {
        	if(buf != null){
        		buf.release();//手动释放缓冲区
        	}
        }
    }
    
    /**
     * 获取配置中心的连接
     * @return
     * @throws ClientException 
     */
    private ConfigCenterClient getClient() throws ClientException{
    	ConfigCenterClient client = new ConfigCenterClient(config.getClientTimeOut(),config.getClientConnectTimeOut(),config.getClientSleepTime(),config.getClientIdleStateTime(),config.getSecretKey());
    	client.connect(config.getMirrorUrl());
    	return client;
    }
    
    
    
    
    /**
     * 验证请求签名
     * @param req
     * @return
     */
    private boolean checkSign(IReq req)throws CenterServerException{
    	if(config==null || config.getSecretKey()==null){
    		return true;
    	}
    	if(req==null){
    		throw new CenterServerException("数据为null"); 
    	}else{
    		String sign = null;
			try {
				sign = SerializableObject.base64EncodeString(Md5.md5((req.toString() + config.getSecretKey()).getBytes(International.CHARSET)));
			} catch (NoSuchAlgorithmException | UnsupportedEncodingException e) {
				throw new CenterServerException("验签错误",e);
			}
    		if(sign.equals(req.getSign())){
    			return true;
    		}else{
    			return false;
    		}
    	}
    }
    
    /**
     * 将错误信息写入通道
     * @param channel
     * @param operation
     * @param data
     * @throws UnsupportedEncodingException 
     */
    public static void sendException(Channel channel, OperationDefine operation,String uuid,Exception e) {
    	String message = LogManager.exceptionToString(e);
    	try {
	    	if(message!=null){
	    		sendData(channel,operation,uuid,message.getBytes(International.CHARSET));
	    	}else{
	    		sendData(channel,operation,uuid,null);
	    	}
		}catch(Exception e1) {
			LogManager.errorLog(CenterServerNettyHandler.class, new ServerException("sendException",e1));
		}
    }
    
    /**
     * 将结果数据写入通道
     * @param channel
     * @param operation
     * @param data
     * @throws UnsupportedEncodingException 
     */
    public static void sendData(Channel channel, OperationDefine operation,String uuid,byte[] data) throws UnsupportedEncodingException{
        if (channel != null && channel.isOpen() && channel.isActive() && channel.isWritable()) {
            ByteBufAllocator alloc = channel.alloc();
        	int len = DataAgreement.BYTE_LENGTH + DataAgreement.UUID_LENGTH;
            if (data != null) {
                len = len + data.length;
            }
            ByteBuf buf = alloc.buffer(len);
            buf.writeByte(operation.value());
            buf.writeBytes(uuid.getBytes(International.CHARSET));
            if(data!=null){
            	buf.writeBytes(data);
            }
        	channel.writeAndFlush(buf).addListener(
            	new ChannelFutureListener() {
                	public void operationComplete(ChannelFuture f)throws Exception {
                        if (!f.isSuccess()) {
                            LogManager.errorLog(CenterServerNettyHandler.class, "Message sending failed",f.cause());
                        }
                    }
                });

        }
    }
    
    /**
     * @see ChannelInboundHandlerAdapter#exceptionCaught(ChannelHandlerContext, Throwable)
     */
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        ctx.disconnect();// 服务端发送异常时关闭客户端
    }

    /**
     * @see ChannelInboundHandlerAdapter#channelReadComplete(ChannelHandlerContext)
     */
    @Override
    public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
   		ctx.flush();
    }

    /**
     * 当客户端主动断开服务端的链接后，这个通道就是不活跃的。 也就是说客户端与服务端的关闭了通信通道并且不可以传输数据
     */
    @Override
    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
        ctx.close();
    }
    
}
